package opmq

import (
	"gitee.com/maomaomaoge/opmq/packets"
	"strings"
	"sync"
)

// subscription 订阅
// - 维护不带通配符的连接: 用map
// - 维护带通配符的连接: 用树
// - 维护带retain的消息,在每个订阅重新连接时候，会进行一遍检索
// - 订阅任务处理
// - 处理订阅报文的结构内存
//
// todo: 需要加上耳机开关量，读写不想影响
type subscription struct {
	mux sync.Mutex

	subNoWild map[string][]*clientConn
	subUnTree *UnorderedTree // 带通配符的

	// 接受任务,默认长度给大点，收到的任务一定是发布方的报文
	subReceive chan *packets.PublishPacket

	Broker *Server
}

func NewSubscription(broker *Server) *subscription {
	s := &subscription{
		subReceive: make(chan *packets.PublishPacket, 1000),
		subUnTree:  NewTree(),
		subNoWild:  make(map[string][]*clientConn),
		Broker:     broker,
	}

	go s.run()
	return s
}

// Add 在订阅里面添加topic和一个连接
//
// Note: topic是不带美化，是纯正的topic，不经过任何修饰的
// todo: 掉线重连
func (s *subscription) Add(topic string, c *clientConn) {
	s.mux.Lock()
	defer s.mux.Unlock()

	if !isWildcard(topic) { // 不带通配符的
		// 掉线重连筛选
		newData := make([]*clientConn, 0)
		cs := s.subNoWild[topic]
		for _, v := range cs {
			if v.Cid != c.Cid {
				newData = append(newData, v)
			}
		}

		s.subNoWild[topic] = newData
		s.subNoWild[topic] = append(s.subNoWild[topic], c)
	} else {
		DEBUG.Println("带通配符添加到订阅: ", topic)
		AddTopic(topic, c, s.subUnTree)
	}

	isHave := false
	for _, v := range c.SubList {
		if v == topic {
			isHave = true
		}
	}
	if !isHave {
		c.SubList = append(c.SubList, topic)
	}

	// 发送缓存数据
	//go c.SendRetain()
}

func isWildcard(topic string) bool {
	if strings.Contains(topic, "#") || strings.Contains(topic, "+") {
		return true
	}
	return false
}

// Remove all subscriptions that refer to a connection.
func (s *subscription) unsubAll(c *clientConn) {
	s.mux.Lock()

	// todo:

	s.mux.Unlock()
}

// Remove the subscription to topic for a given connection.
func (s *subscription) unsub(topic string, c *clientConn) {
	s.mux.Lock()

	if isWildcard(topic) {
		// 删除内存节点
		RemoveNode(topic, s.subUnTree, c)
	} else {
		delete(s.subNoWild, topic)
	}

	// 删除前端视图的topic
	tmp := make([]string, 0)
	for _, v := range c.SubList {
		if topic != v {
			tmp = append(tmp, v)
		}
	}
	c.SubList = tmp

	s.mux.Unlock()
}

func (s *subscription) Submit(pp *packets.PublishPacket) {
	s.subReceive <- pp
}

func (s *subscription) run() {
	for pp := range s.subReceive {
		// 收到订阅报文
		// 先找不带通配符的，再找带通配符的
		go func() {
			pt := pp.TopicName
			DEBUG_TREE.Println("树结构查找pt： ", pt)
			// 只封装这一次报文即可
			packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
			packet.Qos = 0 // 这里给订阅的人直接发布Qos为0，省事
			packet.TopicName = pt
			packet.MessageID = pp.MessageID
			packet.Payload = pp.Payload
			packet.Retain = pp.Retain
			DEBUG_PUBLISH.Println("接受发布数据的qos: ", pp.Qos)

			localHave := false

			cls, ok := s.subNoWild[pt] // 先去没有通配符里面找
			if ok {
				for _, v := range cls {
					v.Submit(packet)
				}

				localHave = true
			}

			//DEBUG.Println("带通配符的数据")
			wildConns := NewTreeData()
			Scan2(pt, s.subUnTree, wildConns)
			if wildConns != nil {
				//DEBUG.Println("找到通配符节点个数: ", len(wildConns.Res), s.subUnTree)
				for _, v := range wildConns.Res {
					if len(v.conns) > 0 {
						localHave = true
					}

					for _, v2 := range v.conns {
						v2.Submit(packet)
					}
				}
			} else {
				DEBUG.Println("没有找到通配符节点")
			}

			// 进行集群转发送数据, 也就是订阅40,发布50的情况
			//CLUSTER_DEBUG.Println("进行发送集群数据")
			if !localHave {
				s.Broker.ClusterSelf.Job <- &ClusterMessage{
					Topic:   pt,
					PayLoad: pp.Payload,
				}
			}
		}()
	}
}
